package drds.plus.datasource.api;

import com.alibaba.druid.pool.DruidDataSource;
import drds.plus.datasource.configuration.DataSourceConnectionProperties;
import drds.plus.datasource.configuration.DatabaseStatus;
import drds.plus.datasource.configuration.DatasourceConfiguration;
import drds.plus.datasource.connection_restrict.*;
import drds.plus.datasource.exception_sorter.ExceptionSorter;
import drds.plus.datasource.exception_sorter.ExceptionSorterImpl;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class DataSourceWrapper {

    protected static int retryBadDbInterval = 0; // milliseconds

    /**
     * 当前线程的threadCount值,如果进行了切换。 那么使用的是不同的Datasource包装类，不会相互影响。
     * threadCount输出在切换过程中在那个时候不能反应准确的值。
     * 但因为旧的被丢弃前也有用，等于在内存中维持了两份不同的TDataSourceWrapper. 因此线程计数不会额外增加。
     */
    final AtomicInteger threadCount = new AtomicInteger(); // 包权限
    final AtomicInteger threadCountReject = new AtomicInteger(); // 包权限
    final AtomicInteger concurrentReadCount = new AtomicInteger(); // 包权限
    final AtomicInteger concurrentWriteCount = new AtomicInteger(); // 包权限
    /**
     * 写计数
     */

    final AtomicInteger writeTimesReject = new AtomicInteger(); // 包权限
    /**
     * 读计数
     */

    final AtomicInteger readTimesReject = new AtomicInteger(); // 包权限
    final String applicationId;
    private final DruidDataSource druidDataSource;

    private final ReentrantLock lock = new ReentrantLock();
    protected DatasourceConfiguration runTimeDatasourceConfiguration;
    volatile TimeSliceFlowControl writeTimeSliceFlowControl; // 包权限
    volatile TimeSliceFlowControl readTimeSliceFlowControl; // 包权限
    volatile DataSourceConnectionProperties dataSourceConnectionProperties = new DataSourceConnectionProperties(); // 包权限
    /**
     * 应用连接限制
     */
    private ConnectionRestrictor connectionRestrictor;
    private volatile SmoothTransitionDevice smoothTransitionDevice = new SmoothTransitionDevice(0);
    private volatile long lastRetryTime = 0;

    public DataSourceWrapper(String applicationId, DatasourceConfiguration runTimeDatasourceConfiguration, DruidDataSource druidDataSource) {
        this.applicationId = applicationId;
        this.runTimeDatasourceConfiguration = runTimeDatasourceConfiguration;
        this.druidDataSource = druidDataSource;

        this.readTimeSliceFlowControl = new TimeSliceFlowControl("读流量", runTimeDatasourceConfiguration.getTimeSliceInMillis(), runTimeDatasourceConfiguration.getReadRestrictTimes());
        this.writeTimeSliceFlowControl = new TimeSliceFlowControl("写流量", runTimeDatasourceConfiguration.getTimeSliceInMillis(), runTimeDatasourceConfiguration.getWriteRestrictTimes());

        log.warn("set thread count restrict " + runTimeDatasourceConfiguration.getThreadCountRestrict());
        this.dataSourceConnectionProperties.threadCountRestriction = runTimeDatasourceConfiguration.getThreadCountRestrict();

        log.info("set maxConcurrentReadRestrict " + runTimeDatasourceConfiguration.getMaxConcurrentReadRestrict());
        this.dataSourceConnectionProperties.maxConcurrentReadRestrict = runTimeDatasourceConfiguration.getMaxConcurrentReadRestrict();

        log.info("set maxConcurrentWriteRestrict " + runTimeDatasourceConfiguration.getMaxConcurrentWriteRestrict());
        this.dataSourceConnectionProperties.maxConcurrentWriteRestrict = runTimeDatasourceConfiguration.getMaxConcurrentWriteRestrict();
    }

    public DatabaseStatus getDatabaseStatus() {
        return dataSourceConnectionProperties.dataBaseStatus;
    }

    public void setDatabaseStatus(DatabaseStatus dataBaseStatus) {
        this.dataSourceConnectionProperties.dataBaseStatus = dataBaseStatus;
    }

    public void init() {
        // 初始化连接分桶
        List<ConnectionRestrictEntry> connectionRestrictEntryList = runTimeDatasourceConfiguration.getConnectionRestrictEntryList();
        if (connectionRestrictEntryList != null) {
            this.connectionRestrictor = new ConnectionRestrictor(connectionRestrictEntryList);
        }

    }

    /**
     * 这里只做了tryLock连接尝试，真正的逻辑委派给getConnection0
     */
    public ConnectionWrapper getConnectionWrapper() throws SQLException {
        SmoothTransitionDevice smoothTransitionDevice = this.smoothTransitionDevice;
        try {
            if (smoothTransitionDevice.isNotAvailable()) {
                boolean toTry = System.currentTimeMillis() - lastRetryTime > retryBadDbInterval;
                if (toTry && lock.tryLock()) {
                    try {
                        ConnectionWrapper connectionWrapper = this.getConnectionWrapper0(); // 同一个时间只会有一个线程继续使用这个数据源。
                        smoothTransitionDevice.setAvailable(); // 用一个线程重试，执行成功则标记为可用，自动恢复
                        return connectionWrapper;
                    } finally {
                        lastRetryTime = System.currentTimeMillis();
                        lock.unlock();
                    }
                } else {
                    throw new NotAvailableException(this.runTimeDatasourceConfiguration.getDatabaseName() + " isNotAvailable"); // 其他线程fail-fast
                }
            } else {
                if (smoothTransitionDevice.checkSmoothThrough()) {
                    return this.getConnectionWrapper0();
                } else {
                    throw new NotAvailableException(this.runTimeDatasourceConfiguration.getDatabaseName() + " squeezeThrough rejected on fatal reset"); // 未通过复位时的限流保护
                }
            }
        } catch (SQLException e) {
            ExceptionSorter exceptionSorter = new ExceptionSorterImpl();
            if (exceptionSorter.isFatalException(e)) {
                smoothTransitionDevice.setNotAvailable();
            }
            throw new SQLException("get connection failed,dbKey is " + (dataSourceConnectionProperties.datasourceId != null ? dataSourceConnectionProperties.datasourceId : this.runTimeDatasourceConfiguration.getDatabaseName()), e);
        }
    }

    private ConnectionWrapper getConnectionWrapper0() throws SQLException {
        ConnectionRestrictSlot connectionRestrictSlot = null;
        ConnectionWrapper connectionWrapper;
        try {
            recordThreadCount();
            if (connectionRestrictor != null) {
                connectionRestrictSlot = connectionRestrictor.tryAcquire(runTimeDatasourceConfiguration.getBlockingTimeout());
            }
            connectionWrapper = new ConnectionWrapper(this.applicationId, this, getConnection(), connectionRestrictSlot);
            return connectionWrapper;
        } catch (SQLException e) {
            if (connectionRestrictSlot != null) {
                connectionRestrictSlot.release();
            }
            threadCount.decrementAndGet();
            throw e;
        } catch (RuntimeException e) {
            if (connectionRestrictSlot != null) {
                connectionRestrictSlot.release();
            }
            threadCount.decrementAndGet();
            throw e;
        }

    }

    private Connection getConnection() throws SQLException {
        try {
            return druidDataSource.getConnection();
        } catch (Throwable e) {
            // 拿不到链接，记录下ip信息,方便排查
            throw new RuntimeException("getConnectionWrapper failed[" + runTimeDatasourceConfiguration.getIp() + ":" + runTimeDatasourceConfiguration.getPort() + "]", e);
        }
    }

    private void recordThreadCount() throws SQLException {
        int threadCountRestriction = dataSourceConnectionProperties.threadCountRestriction;
        int currentThreadCount = threadCount.incrementAndGet();
        if (threadCountRestriction != 0) {
            if (currentThreadCount > threadCountRestriction) {
                threadCountReject.incrementAndGet();
                throw new SQLException("max thread count : " + currentThreadCount);
            }
        }
    }

    /**
     * 设置
     */
    public synchronized void setDatasourceId(String datasourceId) {
        this.dataSourceConnectionProperties.datasourceId = datasourceId;
    }

    public synchronized void setIp(String ip) {
        this.dataSourceConnectionProperties.ip = ip;
    }

    public synchronized void setPort(String port) {
        this.dataSourceConnectionProperties.port = port;
    }

    public synchronized void setDatabaseName(String databaseName) {
        this.dataSourceConnectionProperties.databaseName = databaseName;
    }

    /**
     * 设置时间片，在这个时候要重新制定计划。 bug fix : 以前没有重新制定schedule.导致这个设置是无效的
     *
     * @param timeSliceInMillis
     */
    public synchronized void setTimeSliceInMillis(int timeSliceInMillis) {
        if (timeSliceInMillis == 0) {
            log.info("timeSliceInMills is 0,return ");
        }
        this.readTimeSliceFlowControl = new TimeSliceFlowControl("读流量", timeSliceInMillis, runTimeDatasourceConfiguration.getReadRestrictTimes());
        this.writeTimeSliceFlowControl = new TimeSliceFlowControl("写流量", timeSliceInMillis, runTimeDatasourceConfiguration.getWriteRestrictTimes());

    }

}
